package com.bw.ads;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/*
(1,192.168.0.1,fail,1558430842) A
(1,192.168.0.2,fail,1558430843) B
1,192.168.0.2,fail,1558430844
1,192.168.0.3,fail,1558430850
1,192.168.0.3,fail,1558430851
2,192.168.10.10,fail,1558430851
2,192.168.10.10,fail,1558430858
2,192.168.10.10,fail,1558430864
2,192.168.10.10,fail,1558430878
 */
public class FlinkTM5_T_02_弃用 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(fsEnv);

        tEnv.executeSql("create table dwd_trade_orders(" +
                "    orderInfo Map<STRING,STRING>," +
                "    orderDetail Map<STRING,STRING>," +
                "    skuInfo Map<STRING,STRING>," +
                "    userInfo Map<STRING,STRING>," +
                "    baseProvince Map<STRING,STRING>," +
                "    ts BIGINT," +
                "    rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000))," +
                "    WATERMARK FOR rowtime AS rowtime" +
                "    ) with (" +
                "  'connector' = 'kafka',  " +
                "  'topic' = 'dwd_trade_orders_yk1'," +
                "  'properties.bootstrap.servers' = 'hadoop102:9092', " +
                "  'properties.group.id' = 'flinktm5_t_02'," +
                "  'format' = 'json'," +
                "  'scan.startup.mode' = 'earliest-offset')"
        );

//        tEnv.sqlQuery("select * from dwd_trade_orders").execute().print();

        Table t1=tEnv.sqlQuery("select " +
                " orderInfo['id'] as order_id," +
                " baseProvince['id'] as province_id," +
                " baseProvince['name'] as province_name," +
                " skuInfo['id'] as sku_id," +
                " skuInfo['name'] as sku_name," +
                " userInfo['id'] as user_id," +
                " cast(orderDetail['sku_num'] as BIGINT) as sku_num," +
                " rowtime" +
                " from dwd_trade_orders");
//
        tEnv.createTemporaryView("dwd_trade_orders_info",t1);
//
//        tEnv.sqlQuery("select * from dwd_trade_orders_info").execute().print();


        /**
         *   窗口    地区  sku_id  sku_num
         *   10-20  北京   10      200
         *   10-20  北京   11      600
         *   10-20  上海   10      500
         *   20-30  北京   10      300
         *   20-30  上海   20      300
         *   ---------------------------
         *  stt edt  p    sku_nums
         *   10-20  北京   2600
         *   10-20  上海   500
         *   20-30  北京   300
         *
         *  --------------------------
         *   10-20  北京  2600   1
         *   10-20  上海  500    1
         *   20-30  北京   300   1
         */
//        tEnv.sqlQuery("select * from (\n" +
//                " select *,row_number() over(partition by stt,edt,province_id order by skusums desc) rk \n" +
//                " from (\n" +
//                "  select *,sum(skusums) over(partition by stt,edt,province_id) provincesums \n" +
//                "  from (\n" +
//                "   select\n" +
//                "       date_format(TUMBLE_START(rowtime, '10' SECOND),'yyyy-MM-dd HH:mm:ss') stt, \n" +
//                "       date_format(TUMBLE_END(rowtime, '10' SECOND),'yyyy-MM-dd HH:mm:ss') edt,   \n" +
//                "       province_id,\n" +
//                "       sku_id,\n" +
//                "       sum(sku_num) skusums \n" +
//                "       from dwd_trade_orders_info \n" +
//                "       group by TUMBLE(rowtime, INTERVAL '10' SECOND),province_id,sku_id\n" +
//                "  )" +
//                " )" +
//                " where cast(skusums as double)/provincesums >=0.12 \n" +
//                ") where rk<=5").execute().print();
//


//        Table tt1 = tEnv.sqlQuery("select \n" +
//                "    province_id,\n" +
//                "    sku_id,\n" +
//                "    sum(sku_num) skunums\n" +
//                "from dwd_trade_orders_info group by province_id,sku_id");
//
//
//        tEnv.createTemporaryView("tt1",tt1);



        tEnv.sqlQuery("select *,sum(skunums) over (partition by province_id ) from (select \n" +
                "    province_id,\n" +
                "    sku_id,\n" +
                "    sum(sku_num) skunums\n" +
                "from dwd_trade_orders_info group by province_id,sku_id )").execute().print();

        /**
         *  sdt edt province_id  sku_id    sku_num
         *  10  20    北京         10      100
         *  10  20    北京         11      200
         *  10  20    北京         12      300
         *  10  20    上海         10      100
         *  10  20    上海         11      200
         *  10  20    上海         12      300
         *
         */
//        Table tt1=tEnv.sqlQuery(" select\n" +
//                "       TUMBLE_START(rowtime, '10' SECOND) stt, \n" +
//                "       TUMBLE_END(rowtime, '10' SECOND) edt,   \n" +
//                "       province_id,\n" +
//                "       sku_id,\n" +
//                "       sum(sku_num) skusums \n" +
//                "       from dwd_trade_orders_info \n" +
//                "       group by TUMBLE(rowtime, INTERVAL '10' SECOND),province_id,sku_id\n"
//                );
//

//        Table tt1 = tEnv.sqlQuery("select \n" +
//                "    TUMBLE_START(rowtime, INTERVAL '10' SECOND) stt,\n" +
//                "    TUMBLE_END(rowtime, INTERVAL '10' SECOND) edt,\n" +
//                "    province_id,\n" +
//                "    sku_id,\n" +
//                "    sum(sku_num) skunums\n" +
//                "from dwd_trade_orders_info\n" +
//                "group by TUMBLE(rowtime, INTERVAL '10' SECOND),province_id,sku_id");
//        tEnv.createTemporaryView("tt1",tt1);
////
//        tEnv.sqlQuery("select * from tt1").execute().print();


        /**
         *
         *  province_id   sku_num   sku_nums   sdt   edt
         *    北京          100       100       10    20
         *    北京          200       300       10    20
         *    北京          300       600        10   20
         */
//        Table tt2 = tEnv.sqlQuery("SELECT *, " +
//                " sum(skusums) OVER (" +
//                "   PARTITION BY province_id " +
//                "   ORDER BY stt " +
//                "   ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" +
//                ") AS provincesums " +
//                "FROM tt1");
////
//        tEnv.createTemporaryView("tt2",tt2);
//
        /**
         *
         *
         *
         */
//        Table tt3 = tEnv.sqlQuery("select * from (SELECT * ," +
//                "    row_number() over(partition by stt,edt,province_id order by skusums desc) rk" +
//                "    FROM tt2 where cast(skusums as double)/provincesums >=0.12 )" +
//                " where rk<=5"
//             );
//
//        tEnv.createTemporaryView("tt3",tt3);
//
//        tEnv.sqlQuery("select * from tt3").execute().print();


        //RANGE BETWEENT INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
        //fsEnv.execute();

    }
}
